Purpose: Decode AIS message (payload) into a readable format¶
Output: Separated by fields¶
In [4]:
"""
AIS Message Decoder for Final Year Project
Purpose: Decode different types of AIS messages from NMEA format
Output: TXT format with all the navigation fields
Reference: https://gpsd.gitlab.io/gpsd/AIVDM.html#_json_ais_encoding
"""
import sys
# convert AIS character to 6-bit value
def convert_ais_char(char):
ascii_value = ord(char)
val = ascii_value - 48
if val > 40:
val = val - 8
return val
# extract bits from binary string
def extract_bits(binary_data, start_pos, bit_length):
if start_pos + bit_length > len(binary_data):
return None
bit_string = binary_data[start_pos:start_pos + bit_length]
return int(bit_string, 2)
# extract signed bits (for negative values)
def extract_signed_bits(binary_data, start_pos, bit_length):
value = extract_bits(binary_data, start_pos, bit_length)
if value is None:
return None
# check if negative (MSB is 1)
if value >= (1 << (bit_length - 1)):
value = value - (1 << bit_length)
return value
# convert payload to binary
def convert_payload_to_binary(payload):
result = ""
for char in payload:
six_bit_val = convert_ais_char(char)
binary_str = format(six_bit_val, '06b')
result += binary_str
return result
# parse NMEA sentence to get payload
def get_payload_from_nmea(sentence):
if not sentence.startswith('!AIVDM') and not sentence.startswith('!AIVDO'):
return None
parts = sentence.strip().split(',')
if len(parts) >= 6:
return parts[5]
return None
# format coordinates with hemisphere
def format_lat_lon(coordinate, is_lon=True):
if coordinate is None:
return None, None
if is_lon:
if coordinate >= 0:
hemisphere = 'E'
else:
hemisphere = 'W'
else: # latitude
if coordinate >= 0:
hemisphere = 'N'
else:
hemisphere = 'S'
return abs(coordinate), hemisphere
# main decode function
def decode_ais(nmea_sentence):
payload = get_payload_from_nmea(nmea_sentence)
if not payload:
return None
binary_data = convert_payload_to_binary(payload)
if len(binary_data) < 38:
return None
# get basic message info
msg_type = extract_bits(binary_data, 0, 6)
repeat_ind = extract_bits(binary_data, 6, 2)
mmsi = extract_bits(binary_data, 8, 30)
# initialize all variables
nav_status = None
rot = None
sog = None
pos_accuracy = None
longitude = None
latitude = None
lat_hem = None
lon_hem = None
cog = None
heading = None
utc_sec = None
raim = None
sync = None
slot = None
# decode based on message type
if msg_type in [1, 2, 3] and len(binary_data) >= 168:
# Class A position reports
nav_status = extract_bits(binary_data, 38, 4)
# Rate of turn calculation - GPSD standard
rot_raw = extract_signed_bits(binary_data, 42, 8)
if rot_raw == -128:
rot = "-128.0" # Not available (default)
elif rot_raw == -127:
rot = "-720.0" # Turning left at more than 5°/30s
elif rot_raw == 127:
rot = "+127.0" # Turning right at more than 5°/30s
elif rot_raw is not None:
if rot_raw == 0:
rot = "+0.0"
else:
# GPSD formula: ROT degrees/min = (ROT_raw/4.733)^2 * sign(ROT_raw)
if rot_raw > 0:
rot_degrees = (rot_raw / 4.733) ** 2
rot = f"+{rot_degrees:.1f}"
else:
rot_degrees = (abs(rot_raw) / 4.733) ** 2
rot = f"-{rot_degrees:.1f}"
else:
rot = "+0.0"
# Speed over ground
sog_raw = extract_bits(binary_data, 50, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 60, 1)
# Position decoding
lon_raw = extract_signed_bits(binary_data, 61, 28)
lat_raw = extract_signed_bits(binary_data, 89, 27)
# check for valid longitude
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
# check for valid latitude
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
# Course over ground
cog_raw = extract_bits(binary_data, 116, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
# True heading
hdg_raw = extract_bits(binary_data, 128, 9)
if hdg_raw == 511:
heading = 511
else:
heading = hdg_raw
# UTC second
utc_raw = extract_bits(binary_data, 137, 6)
if utc_raw >= 60:
utc_sec = 60
else:
utc_sec = utc_raw
# Communication state
raim = extract_bits(binary_data, 148, 1)
sync = extract_bits(binary_data, 149, 2)
slot = extract_bits(binary_data, 151, 3)
elif msg_type == 4 and len(binary_data) >= 168:
# Base station report
pos_accuracy = extract_bits(binary_data, 78, 1)
lon_raw = extract_signed_bits(binary_data, 79, 28)
lat_raw = extract_signed_bits(binary_data, 107, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
raim = extract_bits(binary_data, 148, 1)
elif msg_type == 9 and len(binary_data) >= 168:
# SAR aircraft position
sog_raw = extract_bits(binary_data, 50, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 60, 1)
lon_raw = extract_signed_bits(binary_data, 61, 28)
lat_raw = extract_signed_bits(binary_data, 89, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
cog_raw = extract_bits(binary_data, 116, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
utc_sec = extract_bits(binary_data, 128, 6)
raim = extract_bits(binary_data, 147, 1)
elif msg_type == 18 and len(binary_data) >= 168:
# Class B position report
sog_raw = extract_bits(binary_data, 46, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 56, 1)
lon_raw = extract_signed_bits(binary_data, 57, 28)
lat_raw = extract_signed_bits(binary_data, 85, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
cog_raw = extract_bits(binary_data, 112, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
hdg_raw = extract_bits(binary_data, 124, 9)
if hdg_raw == 511:
heading = 511
else:
heading = hdg_raw
utc_raw = extract_bits(binary_data, 133, 6)
if utc_raw >= 60:
utc_sec = 60
else:
utc_sec = utc_raw
raim = extract_bits(binary_data, 147, 1)
sync = extract_bits(binary_data, 149, 2)
slot = extract_bits(binary_data, 151, 3)
elif msg_type == 19 and len(binary_data) >= 312:
# Extended Class B
sog_raw = extract_bits(binary_data, 46, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 56, 1)
lon_raw = extract_signed_bits(binary_data, 57, 28)
lat_raw = extract_signed_bits(binary_data, 85, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
cog_raw = extract_bits(binary_data, 112, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
hdg_raw = extract_bits(binary_data, 124, 9)
if hdg_raw == 511:
heading = 511
else:
heading = hdg_raw
utc_raw = extract_bits(binary_data, 133, 6)
if utc_raw >= 60:
utc_sec = 60
else:
utc_sec = utc_raw
raim = extract_bits(binary_data, 305, 1)
elif msg_type == 21 and len(binary_data) >= 272:
# Aid to navigation
pos_accuracy = extract_bits(binary_data, 163, 1)
lon_raw = extract_signed_bits(binary_data, 164, 28)
lat_raw = extract_signed_bits(binary_data, 192, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
utc_sec = extract_bits(binary_data, 253, 6)
raim = extract_bits(binary_data, 268, 1)
elif msg_type == 27 and len(binary_data) >= 96:
# Long range AIS
pos_accuracy = extract_bits(binary_data, 38, 1)
raim = extract_bits(binary_data, 39, 1)
nav_status = extract_bits(binary_data, 40, 4)
lon_raw = extract_signed_bits(binary_data, 44, 18)
lat_raw = extract_signed_bits(binary_data, 62, 17)
if lon_raw != 0x1A838:
lon_degrees = lon_raw / 600.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0xD548:
lat_degrees = lat_raw / 600.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
sog_raw = extract_bits(binary_data, 79, 6)
if sog_raw == 63:
sog = "0.0"
else:
sog = f"{sog_raw:.1f}"
cog_raw = extract_bits(binary_data, 85, 9)
if cog_raw >= 360:
cog = "360.0"
else:
cog = f"{cog_raw:.1f}"
# return decoded values in correct order
return [
msg_type,
repeat_ind,
mmsi,
nav_status,
rot,
sog,
pos_accuracy,
longitude,
lon_hem,
latitude,
lat_hem,
cog,
heading,
utc_sec,
sync,
slot,
raim
]
# convert values to CSV format
def make_csv_line(decoded_values):
if not decoded_values:
return None
csv_parts = []
for value in decoded_values:
if value is None:
csv_parts.append('0')
else:
csv_parts.append(str(value))
return ','.join(csv_parts)
# process the input file
def process_ais_file(input_filename, output_filename=None):
if output_filename is None:
output_filename = input_filename.replace('.txt', '_decoded.txt')
try:
input_file = open(input_filename, 'r')
output_file = open(output_filename, 'w')
# write header
header = "message_type,repeat_indicator,mmsi,navigation_status,rate_of_turn,speed_over_ground,position_accuracy,longitude,lon_hemisphere,latitude,lat_hemisphere,course_over_ground,true_heading,utc_second,sync_state,slot_timeout,raim_flag"
output_file.write(header + '\n')
total_messages = 0
decoded_messages = 0
message_types = {}
messages_with_position = 0
for line in input_file:
line = line.strip()
if not line:
continue
total_messages += 1
result = decode_ais(line)
if result:
msg_type = result[0]
if msg_type in message_types:
message_types[msg_type] += 1
else:
message_types[msg_type] = 1
# check if message has position data
if result[7] is not None and result[9] is not None:
messages_with_position += 1
csv_line = make_csv_line(result)
if csv_line:
output_file.write(csv_line + '\n')
decoded_messages += 1
input_file.close()
output_file.close()
# print summary
print(f"Total messages processed: {total_messages}")
print(f"Successfully decoded: {decoded_messages}")
print(f"Messages with position data: {messages_with_position}")
print(f"\nMessage type summary:")
for msg_type in sorted(message_types.keys()):
print(f" Type {msg_type}: {message_types[msg_type]} messages")
print(f"\nDecoded data saved to: {output_filename}")
except FileNotFoundError:
print(f"Error: Could not find file {input_filename}")
except Exception as error:
print(f"Error occurred: {error}")
# debug function to test single message
def debug_single_message(nmea_msg):
payload = get_payload_from_nmea(nmea_msg)
if not payload:
print("Could not parse NMEA message")
return
binary = convert_payload_to_binary(payload)
print(f"Payload: {payload}")
print(f"Binary length: {len(binary)} bits")
msg_type = extract_bits(binary, 0, 6)
repeat = extract_bits(binary, 6, 2)
mmsi = extract_bits(binary, 8, 30)
print(f"Message type: {msg_type}")
print(f"Repeat: {repeat}")
print(f"MMSI: {mmsi}")
if msg_type in [1, 2, 3]:
print("Class A position report detected")
nav_stat = extract_bits(binary, 38, 4)
rot_raw = extract_signed_bits(binary, 42, 8)
sog_raw = extract_bits(binary, 50, 10)
pos_acc = extract_bits(binary, 60, 1)
lon_raw = extract_signed_bits(binary, 61, 28)
lat_raw = extract_signed_bits(binary, 89, 27)
print(f"Navigation status: {nav_stat}")
print(f"ROT raw: {rot_raw}")
print(f"SOG raw: {sog_raw}")
print(f"Position accuracy: {pos_acc}")
print(f"Longitude raw: {lon_raw}")
print(f"Latitude raw: {lat_raw}")
# main program
if __name__ == "__main__":
# test with sample message
test_nmea = "!AIVDM,1,1,,A,38IFDN0Ohj7JvbN0fABtpbJ401w@,0*69"
print("Testing decoder with sample message:")
print(f"Input: {test_nmea}")
debug_single_message(test_nmea)
decoded = decode_ais(test_nmea)
if decoded:
print(f"Decoded result: {make_csv_line(decoded)}")
else:
print("Decoding failed!")
print("\n" + "="*60 + "\n")
# process full file
input_path = r"C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\L4_All_AIS_Messages.txt"
output_path = r"C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\L4_NMEA_Decoded.txt"
print(f"Processing AIS messages from: {input_path}")
process_ais_file(input_path, output_path)
Testing decoder with sample message: Input: !AIVDM,1,1,,A,38IFDN0Ohj7JvbN0fABtpbJ401w@,0*69 Payload: 38IFDN0Ohj7JvbN0fABtpbJ401w@ Binary length: 168 bits Message type: 3 Repeat: 0 MMSI: 563451000 Class A position report detected Navigation status: 0 ROT raw: 127 SOG raw: 50 Position accuracy: 0 Longitude raw: 62256463 Latitude raw: 758091 Decoded result: 3,0,563451000,0,+127.0,5.0,0,103.7607717,E,1.2634850,N,329.8,333,2,0,0,0 ============================================================ Processing AIS messages from: C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\L4_All_AIS_Messages.txt Total messages processed: 478 Successfully decoded: 478 Messages with position data: 467 Message type summary: Type 1: 360 messages Type 3: 115 messages Type 4: 3 messages Decoded data saved to: C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\L4_NMEA_Decoded.txt
Mapping lat long with impt info¶
In [9]:
import pandas as pd
import folium
from IPython.display import display
# Load data
decoded_file = r"C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\L4_NMEA_Decoded.txt"
df = pd.read_csv(decoded_file)
# convert to numeric and fix hemisphere if needed
df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
if "lon_hemisphere" in df.columns:
df.loc[df["lon_hemisphere"].fillna("").str.upper() == "W", "longitude"] *= -1
if "lat_hemisphere" in df.columns:
df.loc[df["lat_hemisphere"].fillna("").str.upper() == "S", "latitude"] *= -1
# drop invalid coords
df = df.dropna(subset=["longitude", "latitude"])
df = df[(df["longitude"] != 0) & (df["latitude"] != 0)]
# keep last message per MMSI (latest timestamp)
df_plot = df.drop_duplicates(subset=["mmsi"], keep="last").copy()
# center map
center_lat, center_lon = df_plot["latitude"].mean(), df_plot["longitude"].mean()
m = folium.Map(location=[center_lat, center_lon], zoom_start=10)
# add markers with popup
for _, row in df_plot.iterrows():
popup_html = (
f"<b>MMSI:</b> {row['mmsi']}<br>"
f"<b>SOG:</b> {row.get('speed_over_ground','')} kn<br>"
f"<b>COG:</b> {row.get('course_over_ground','')}°<br>"
f"<b>Heading:</b> {row.get('true_heading','')}°<br>"
f"<b>Lat:</b> {row['latitude']:.5f}°<br>"
f"<b>Lon:</b> {row['longitude']:.5f}°"
)
folium.CircleMarker(
location=[row["latitude"], row["longitude"]],
radius=5,
color="blue",
fill=True,
fill_opacity=0.8,
popup=folium.Popup(popup_html, max_width=300)
).add_to(m)
display(m)
# can save as html ltr on if needed:
# m.save("ais_map.html")
Make this Notebook Trusted to load map: File -> Trust Notebook
In [1]:
"""
AIS Message Decoder for Final Year Project
Purpose: Decode different types of AIS messages from NMEA format
Output: TXT format with all the navigation fields
Reference: https://gpsd.gitlab.io/gpsd/AIVDM.html#_json_ais_encoding
"""
import sys
# convert AIS character to 6-bit value
def convert_ais_char(char):
ascii_value = ord(char)
val = ascii_value - 48
if val > 40:
val = val - 8
return val
# extract bits from binary string
def extract_bits(binary_data, start_pos, bit_length):
if start_pos + bit_length > len(binary_data):
return None
bit_string = binary_data[start_pos:start_pos + bit_length]
return int(bit_string, 2)
# extract signed bits (for negative values)
def extract_signed_bits(binary_data, start_pos, bit_length):
value = extract_bits(binary_data, start_pos, bit_length)
if value is None:
return None
# check if negative (MSB is 1)
if value >= (1 << (bit_length - 1)):
value = value - (1 << bit_length)
return value
# convert payload to binary
def convert_payload_to_binary(payload):
result = ""
for char in payload:
six_bit_val = convert_ais_char(char)
binary_str = format(six_bit_val, '06b')
result += binary_str
return result
# parse NMEA sentence to get payload
def get_payload_from_nmea(sentence):
if not sentence.startswith('!AIVDM') and not sentence.startswith('!AIVDO'):
return None
parts = sentence.strip().split(',')
if len(parts) >= 6:
return parts[5]
return None
# format coordinates with hemisphere
def format_lat_lon(coordinate, is_lon=True):
if coordinate is None:
return None, None
if is_lon:
if coordinate >= 0:
hemisphere = 'E'
else:
hemisphere = 'W'
else: # latitude
if coordinate >= 0:
hemisphere = 'N'
else:
hemisphere = 'S'
return abs(coordinate), hemisphere
# main decode function
def decode_ais(nmea_sentence):
payload = get_payload_from_nmea(nmea_sentence)
if not payload:
return None
binary_data = convert_payload_to_binary(payload)
if len(binary_data) < 38:
return None
# get basic message info
msg_type = extract_bits(binary_data, 0, 6)
repeat_ind = extract_bits(binary_data, 6, 2)
mmsi = extract_bits(binary_data, 8, 30)
# initialize all variables
nav_status = None
rot = None
sog = None
pos_accuracy = None
longitude = None
latitude = None
lat_hem = None
lon_hem = None
cog = None
heading = None
utc_sec = None
raim = None
sync = None
slot = None
# decode based on message type
if msg_type in [1, 2, 3] and len(binary_data) >= 168:
# Class A position reports
nav_status = extract_bits(binary_data, 38, 4)
# Rate of turn calculation - GPSD standard
rot_raw = extract_signed_bits(binary_data, 42, 8)
if rot_raw == -128:
rot = "-128.0" # Not available (default)
elif rot_raw == -127:
rot = "-720.0" # Turning left at more than 5°/30s
elif rot_raw == 127:
rot = "+127.0" # Turning right at more than 5°/30s
elif rot_raw is not None:
if rot_raw == 0:
rot = "+0.0"
else:
# GPSD formula: ROT degrees/min = (ROT_raw/4.733)^2 * sign(ROT_raw)
if rot_raw > 0:
rot_degrees = (rot_raw / 4.733) ** 2
rot = f"+{rot_degrees:.1f}"
else:
rot_degrees = (abs(rot_raw) / 4.733) ** 2
rot = f"-{rot_degrees:.1f}"
else:
rot = "+0.0"
# Speed over ground
sog_raw = extract_bits(binary_data, 50, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 60, 1)
# Position decoding
lon_raw = extract_signed_bits(binary_data, 61, 28)
lat_raw = extract_signed_bits(binary_data, 89, 27)
# check for valid longitude
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
# check for valid latitude
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
# Course over ground
cog_raw = extract_bits(binary_data, 116, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
# True heading
hdg_raw = extract_bits(binary_data, 128, 9)
if hdg_raw == 511:
heading = 511
else:
heading = hdg_raw
# UTC second
utc_raw = extract_bits(binary_data, 137, 6)
if utc_raw >= 60:
utc_sec = 60
else:
utc_sec = utc_raw
# Communication state
raim = extract_bits(binary_data, 148, 1)
sync = extract_bits(binary_data, 149, 2)
slot = extract_bits(binary_data, 151, 3)
elif msg_type == 4 and len(binary_data) >= 168:
# Base station report
pos_accuracy = extract_bits(binary_data, 78, 1)
lon_raw = extract_signed_bits(binary_data, 79, 28)
lat_raw = extract_signed_bits(binary_data, 107, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
raim = extract_bits(binary_data, 148, 1)
elif msg_type == 9 and len(binary_data) >= 168:
# SAR aircraft position
sog_raw = extract_bits(binary_data, 50, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 60, 1)
lon_raw = extract_signed_bits(binary_data, 61, 28)
lat_raw = extract_signed_bits(binary_data, 89, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
cog_raw = extract_bits(binary_data, 116, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
utc_sec = extract_bits(binary_data, 128, 6)
raim = extract_bits(binary_data, 147, 1)
elif msg_type == 18 and len(binary_data) >= 168:
# Class B position report
sog_raw = extract_bits(binary_data, 46, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 56, 1)
lon_raw = extract_signed_bits(binary_data, 57, 28)
lat_raw = extract_signed_bits(binary_data, 85, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
cog_raw = extract_bits(binary_data, 112, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
hdg_raw = extract_bits(binary_data, 124, 9)
if hdg_raw == 511:
heading = 511
else:
heading = hdg_raw
utc_raw = extract_bits(binary_data, 133, 6)
if utc_raw >= 60:
utc_sec = 60
else:
utc_sec = utc_raw
raim = extract_bits(binary_data, 147, 1)
sync = extract_bits(binary_data, 149, 2)
slot = extract_bits(binary_data, 151, 3)
elif msg_type == 19 and len(binary_data) >= 312:
# Extended Class B
sog_raw = extract_bits(binary_data, 46, 10)
if sog_raw == 1023:
sog = "0.0"
else:
sog = f"{sog_raw / 10.0:.1f}"
pos_accuracy = extract_bits(binary_data, 56, 1)
lon_raw = extract_signed_bits(binary_data, 57, 28)
lat_raw = extract_signed_bits(binary_data, 85, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
cog_raw = extract_bits(binary_data, 112, 12)
if cog_raw >= 3600:
cog = "360.0"
else:
cog = f"{cog_raw / 10.0:.1f}"
hdg_raw = extract_bits(binary_data, 124, 9)
if hdg_raw == 511:
heading = 511
else:
heading = hdg_raw
utc_raw = extract_bits(binary_data, 133, 6)
if utc_raw >= 60:
utc_sec = 60
else:
utc_sec = utc_raw
raim = extract_bits(binary_data, 305, 1)
elif msg_type == 21 and len(binary_data) >= 272:
# Aid to navigation
pos_accuracy = extract_bits(binary_data, 163, 1)
lon_raw = extract_signed_bits(binary_data, 164, 28)
lat_raw = extract_signed_bits(binary_data, 192, 27)
if lon_raw != 0x6791AC0:
lon_degrees = lon_raw / 600000.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0x3412140:
lat_degrees = lat_raw / 600000.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
utc_sec = extract_bits(binary_data, 253, 6)
raim = extract_bits(binary_data, 268, 1)
elif msg_type == 27 and len(binary_data) >= 96:
# Long range AIS
pos_accuracy = extract_bits(binary_data, 38, 1)
raim = extract_bits(binary_data, 39, 1)
nav_status = extract_bits(binary_data, 40, 4)
lon_raw = extract_signed_bits(binary_data, 44, 18)
lat_raw = extract_signed_bits(binary_data, 62, 17)
if lon_raw != 0x1A838:
lon_degrees = lon_raw / 600.0
longitude, lon_hem = format_lat_lon(lon_degrees, True)
if longitude is not None:
longitude = f"{longitude:.7f}"
if lat_raw != 0xD548:
lat_degrees = lat_raw / 600.0
latitude, lat_hem = format_lat_lon(lat_degrees, False)
if latitude is not None:
latitude = f"{latitude:.7f}"
sog_raw = extract_bits(binary_data, 79, 6)
if sog_raw == 63:
sog = "0.0"
else:
sog = f"{sog_raw:.1f}"
cog_raw = extract_bits(binary_data, 85, 9)
if cog_raw >= 360:
cog = "360.0"
else:
cog = f"{cog_raw:.1f}"
# return decoded values in correct order
return [
msg_type,
repeat_ind,
mmsi,
nav_status,
rot,
sog,
pos_accuracy,
longitude,
lon_hem,
latitude,
lat_hem,
cog,
heading,
utc_sec,
sync,
slot,
raim
]
# convert values to CSV format
def make_csv_line(decoded_values):
if not decoded_values:
return None
csv_parts = []
for value in decoded_values:
if value is None:
csv_parts.append('0')
else:
csv_parts.append(str(value))
return ','.join(csv_parts)
# process the input file
def process_ais_file(input_filename, output_filename=None):
if output_filename is None:
output_filename = input_filename.replace('.txt', '_decoded.txt')
try:
input_file = open(input_filename, 'r')
output_file = open(output_filename, 'w')
# write header
header = "message_type,repeat_indicator,mmsi,navigation_status,rate_of_turn,speed_over_ground,position_accuracy,longitude,lon_hemisphere,latitude,lat_hemisphere,course_over_ground,true_heading,utc_second,sync_state,slot_timeout,raim_flag"
output_file.write(header + '\n')
total_messages = 0
decoded_messages = 0
message_types = {}
messages_with_position = 0
for line in input_file:
line = line.strip()
if not line:
continue
total_messages += 1
result = decode_ais(line)
if result:
msg_type = result[0]
if msg_type in message_types:
message_types[msg_type] += 1
else:
message_types[msg_type] = 1
# check if message has position data
if result[7] is not None and result[9] is not None:
messages_with_position += 1
csv_line = make_csv_line(result)
if csv_line:
output_file.write(csv_line + '\n')
decoded_messages += 1
input_file.close()
output_file.close()
# print summary
print(f"Total messages processed: {total_messages}")
print(f"Successfully decoded: {decoded_messages}")
print(f"Messages with position data: {messages_with_position}")
print(f"\nMessage type summary:")
for msg_type in sorted(message_types.keys()):
print(f" Type {msg_type}: {message_types[msg_type]} messages")
print(f"\nDecoded data saved to: {output_filename}")
except FileNotFoundError:
print(f"Error: Could not find file {input_filename}")
except Exception as error:
print(f"Error occurred: {error}")
# debug function to test single message
def debug_single_message(nmea_msg):
payload = get_payload_from_nmea(nmea_msg)
if not payload:
print("Could not parse NMEA message")
return
binary = convert_payload_to_binary(payload)
print(f"Payload: {payload}")
print(f"Binary length: {len(binary)} bits")
msg_type = extract_bits(binary, 0, 6)
repeat = extract_bits(binary, 6, 2)
mmsi = extract_bits(binary, 8, 30)
print(f"Message type: {msg_type}")
print(f"Repeat: {repeat}")
print(f"MMSI: {mmsi}")
if msg_type in [1, 2, 3]:
print("Class A position report detected")
nav_stat = extract_bits(binary, 38, 4)
rot_raw = extract_signed_bits(binary, 42, 8)
sog_raw = extract_bits(binary, 50, 10)
pos_acc = extract_bits(binary, 60, 1)
lon_raw = extract_signed_bits(binary, 61, 28)
lat_raw = extract_signed_bits(binary, 89, 27)
print(f"Navigation status: {nav_stat}")
print(f"ROT raw: {rot_raw}")
print(f"SOG raw: {sog_raw}")
print(f"Position accuracy: {pos_acc}")
print(f"Longitude raw: {lon_raw}")
print(f"Latitude raw: {lat_raw}")
# main program
if __name__ == "__main__":
# test with sample message
test_nmea = "!AIVDM,1,1,,A,38IFDN0Ohj7JvbN0fABtpbJ401w@,0*69"
print("Testing decoder with sample message:")
print(f"Input: {test_nmea}")
debug_single_message(test_nmea)
decoded = decode_ais(test_nmea)
if decoded:
print(f"Decoded result: {make_csv_line(decoded)}")
else:
print("Decoding failed!")
print("\n" + "="*60 + "\n")
# process full file
input_path = r"C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\nmea-sample"
output_path = r"C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\NMEA_Decoded.txt"
print(f"Processing AIS messages from: {input_path}")
process_ais_file(input_path, output_path)
Testing decoder with sample message: Input: !AIVDM,1,1,,A,38IFDN0Ohj7JvbN0fABtpbJ401w@,0*69 Payload: 38IFDN0Ohj7JvbN0fABtpbJ401w@ Binary length: 168 bits Message type: 3 Repeat: 0 MMSI: 563451000 Class A position report detected Navigation status: 0 ROT raw: 127 SOG raw: 50 Position accuracy: 0 Longitude raw: 62256463 Latitude raw: 758091 Decoded result: 3,0,563451000,0,+127.0,5.0,0,103.7607717,E,1.2634850,N,329.8,333,2,0,0,0 ============================================================ Processing AIS messages from: C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\nmea-sample Total messages processed: 85194 Successfully decoded: 85109 Messages with position data: 76861 Message type summary: Type 0: 712 messages Type 1: 61418 messages Type 2: 2018 messages Type 3: 8817 messages Type 4: 4074 messages Type 5: 2240 messages Type 6: 458 messages Type 7: 5 messages Type 8: 1688 messages Type 9: 30 messages Type 10: 12 messages Type 11: 38 messages Type 12: 25 messages Type 13: 8 messages Type 14: 4 messages Type 15: 6 messages Type 16: 45 messages Type 17: 343 messages Type 18: 1387 messages Type 19: 219 messages Type 20: 323 messages Type 21: 404 messages Type 22: 29 messages Type 23: 6 messages Type 24: 338 messages Type 25: 2 messages Type 26: 4 messages Type 27: 7 messages Type 28: 1 messages Type 29: 3 messages Type 31: 2 messages Type 32: 33 messages Type 33: 70 messages Type 34: 47 messages Type 35: 19 messages Type 36: 16 messages Type 37: 12 messages Type 38: 5 messages Type 40: 41 messages Type 41: 3 messages Type 42: 1 messages Type 43: 6 messages Type 45: 4 messages Type 46: 1 messages Type 47: 7 messages Type 48: 31 messages Type 49: 16 messages Type 50: 22 messages Type 51: 17 messages Type 52: 21 messages Type 53: 18 messages Type 54: 1 messages Type 55: 4 messages Type 56: 32 messages Type 58: 1 messages Type 59: 6 messages Type 60: 2 messages Type 62: 4 messages Type 63: 3 messages Decoded data saved to: C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\NMEA_Decoded.txt
In [7]:
import pandas as pd
import folium
from IPython.display import display
# Load data
decoded_file = r"C:\Users\cxris\OneDrive\Desktop\VDES research\All_Decoded_L4_Messages\NMEA_Decoded.txt"
df = pd.read_csv(decoded_file)
# convert to numeric and fix hemisphere if needed
df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
if "lon_hemisphere" in df.columns:
df.loc[df["lon_hemisphere"].fillna("").str.upper() == "W", "longitude"] *= -1
if "lat_hemisphere" in df.columns:
df.loc[df["lat_hemisphere"].fillna("").str.upper() == "S", "latitude"] *= -1
# drop invalid coords
df = df.dropna(subset=["longitude", "latitude"])
df = df[(df["longitude"] != 0) & (df["latitude"] != 0)]
# keep last message per MMSI (latest timestamp)
df_plot = df.drop_duplicates(subset=["mmsi"], keep="last").copy()
# center map
center_lat, center_lon = df_plot["latitude"].mean(), df_plot["longitude"].mean()
m = folium.Map(location=[center_lat, center_lon], zoom_start=10)
# add markers with popup
for _, row in df_plot.iterrows():
popup_html = (
f"<b>MMSI:</b> {row['mmsi']}<br>"
f"<b>SOG:</b> {row.get('speed_over_ground','')} km<br>"
f"<b>COG:</b> {row.get('course_over_ground','')}°<br>"
f"<b>Heading:</b> {row.get('true_heading','')}°<br>"
f"<b>Lat:</b> {row['latitude']:.5f}°<br>"
f"<b>Lon:</b> {row['longitude']:.5f}°"
)
folium.CircleMarker(
location=[row["latitude"], row["longitude"]],
radius=5,
color="blue",
fill=True,
fill_opacity=0.8,
popup=folium.Popup(popup_html, max_width=300)
).add_to(m)
display(m)
# can save as html ltr on if needed:
# m.save("ais_map.html")
Make this Notebook Trusted to load map: File -> Trust Notebook